# -*- coding: utf-8 -*-
import os
os.chdir("/home/cloudops/spark")
os.curdir
# Create a SQL Context from Spark Context
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
#============================
# Working with Data Frames
#============================
# Create a Data Frame from a JSON file
# Example:
# { "name": "Benjamin Garrison",
# "gender": "male",
# "deptid": "100",
# "age": "32",
# "salary": "3000" }
empDf = sqlContext.read.json("data/customerData.json")
empDf.show()
# +---+------+------+-----------------+------+
# |age|deptId|gender| name|salary|
# +---+------+------+-----------------+------+
# | 32| 100| male|Benjamin Garrison| 3000|
# | 40| 200| male| Holland Drake| 4500|
# | 26| 100| male| Burks Velasquez| 2700|
# | 51| 100|female| June Rutledge| 4300|
# | 44| 200| male| Nielsen Knapp| 6500|
# +---+------+------+-----------------+------+
empDf.printSchema()
# root
# |-- age: string (nullable = true)
# |-- deptId: string (nullable = true)
# |-- gender: string (nullable = true)
# |-- name: string (nullable = true)
# |-- salary: string (nullable = true)
# Do SQL queries
empDf.select("name", "salary").show()
# +-----------------+------+
# | name|salary|
# +-----------------+------+
# |Benjamin Garrison| 3000|
# . . .
empDf.filter(empDf["age"] == 40).show()
# +---+------+------+-------------+------+
# |age|deptId|gender| name|salary|
# +---+------+------+-------------+------+
# | 40| 200| male|Holland Drake| 4500|
# +---+------+------+-------------+------+
empDf.groupBy("gender").count().show()
# +------+-----+
# |gender|count|
# +------+-----+
# |female| 1|
# | male| 4|
# +------+-----+
empDf.groupBy("deptId").\
agg({"salary": "avg", "age": "max"}).show()
# +------+------------------+--------+
# |deptId| avg(salary)|max(age)|
# +------+------------------+--------+
# | 200| 5500.0| 44|
# | 100|3333.3333333333335| 51|
# +------+------------------+--------+
# =====================================
# Create a Data Frame from a List
# =====================================
# NOTES:
# 1. The field could not be deptId (see JOIN below)
# 2. The Upper/Lower case is ignoreg
deptList = [{'deptName': 'Sales', 'id': "100"},\
{'deptName':'Engineering','id':"200" } \
]
deptDf = sqlContext.createDataFrame(deptList)
deptDf.show()
# +---+-----------+
# | id| name|
# +---+-----------+
# |100| Sales|
# |200|Engineering|
# +---+-----------+
# =====================================
# JOIN the Data Frames
# =====================================
empDf.join(deptDf, empDf.deptId == deptDf.id).show()
# =====================================
# Cascading operations
# =====================================
# ERROR: Reference 'deptId' is ambiguous
empDf.filter(empDf["age"] > 30).join(deptDf,\
empDf.deptId == deptDf.id).\
groupBy("deptId").\
agg({"salary": "avg", "age": "max"}).show()
# +------+-----------+--------+
# |deptId|avg(salary)|max(age)|
# +------+-----------+--------+
# | 200| 5500.0| 44|
# | 100| 3650.0| 51|
# +------+-----------+--------+
# =====================================
# Register a Data Frame as table and
# run SQL statements against it
# =====================================
empDf.registerTempTable("employees")
deptDf.registerTempTable("departments")
sqlContext.sql("SELECT * FROM employees WHERE salary > 4000").show()
sqlContext.sql("SELECT * FROM departments").show()
# +---+------+------+-------------+------+
# |age|deptId|gender| name|salary|
# +---+------+------+-------------+------+
# | 40| 200| male|Holland Drake| 4500|
# | 51| 100|female|June Rutledge| 4300|
# | 44| 200| male|Nielsen Knapp| 6500|
# +---+------+------+-------------+------+
sqlContext.sql("SELECT e.deptId, \
AVG(e.salary) AS avgSalary, \
MAX(e.age) AS maxAge \
FROM employees e \
JOIN departments d \
ON e.deptId == d.id \
WHERE e.age > 30 \
GROUP BY e.deptId").show()
# +------+---------+------+
# |deptId|avgSalary|maxAge|
# +------+---------+------+
# | 200| 5500.0| 44|
# | 100| 3650.0| 51|
# +------+---------+------+
# =====================================
# Pandas (Data Frame)
# =====================================
empPands = empDf.toPandas()
# Pandas >= 0.19.2 must be installed
for index, row in empPands.iterrows():
print(row["salary"])
# =====================================
# Database (JDBC -> MySQL)
# =====================================
# Make sure that the spark classpaths are set
# appropriately in the spark-defaults.conf file
# to include the driver files
# NOTES:
# 1. dbtable = "demotable" could be SQL statement
# 2. DB Driver: JAVA file in //python/lib directory
# mysql -u admin -D wp -p
# > show tables;
# > select * from wp_terms;
# wpTermsDf = SparkSession.read.format("jdbc").options(
wpTermsDf = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://localhost:3306/wp",
driver = "com.mysql.jdbc.Driver",
dbtable = "wp_terms",
user="admin",
password="blueC0aT").load()
wpTermsDf.show()
# +-------+--------------------+--------------------+----------+
# |term_id| name| slug|term_group|
# +-------+--------------------+--------------------+----------+
# | 1| Uncategorized| uncategorized| 0|
# | 2| simple| simple| 0|
# | 3| grouped| grouped| 0|
# | 4| variable| variable| 0|
# . . .
# =====================================
# Creating data frames from RDD
# =====================================
from pyspark.sql import Row
lines = sc.textFile("auto-data.csv")
# Remove the first line
dataLines = lines.filter(lambda x: "FUELTYPE" not in x)
dataLines.count()
parts = dataLines.map(lambda l: l.split(","))
autoMap = parts.map(lambda p: \
Row(make=p[0], body=p[4], hp=int(p[7])))
autoMap.collect()
# [Row(body='hatchback', hp=69, make='subaru'),
# Row(body='hatchback', hp=48, make='chevrolet'),
# Row(body='hatchback', hp=68, make='mazda'),
# Row(body='hatchback', hp=62, make='toyota'),
# . . .
# Infer the schema, and register the DataFrame as a table
autoDf = sqlContext.createDataFrame(autoMap)
autoDf.registerTempTable("autos")
sqlContext.sql("SELECT * FROM autos WHERE hp > 200").show()
# +-----------+---+-------+
# | body| hp| make|
# +-----------+---+-------+
# | hardtop|207|porsche|
# | hardtop|207|porsche|
# | sedan|262| jaguar|
# |convertible|207|porsche|
# +-----------+---+-------+